-
Notifications
You must be signed in to change notification settings - Fork 4
Add serialize #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add serialize #22
Conversation
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #22 +/- ##
=======================================
Coverage ? 77.91%
=======================================
Files ? 22
Lines ? 1028
Branches ? 0
=======================================
Hits ? 801
Misses ? 227
Partials ? 0
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds serialization functionality to the sparrow_ipc library, implementing the ability to serialize Arrow record batches into binary format. This complements the existing deserialization capabilities.
- Implements comprehensive serialization functionality for Arrow record batches and schemas
- Adds new test case to verify round-trip serialization/deserialization consistency
- Introduces utility functions for checking record batch consistency and formatting
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_deserialization_with_files.cpp | Added new test case for serialization round-trip testing and comparison utility function |
| src/serialize_utils.cpp | Core serialization implementation with utilities for converting record batches to binary format |
| include/sparrow_ipc/serialize_utils.hpp | Header file defining serialization utility functions and their documentation |
| include/sparrow_ipc/serialize.hpp | Main serialization API for collections of record batches |
| include/sparrow_ipc/utils.hpp | Added record batch consistency checking template function |
| src/encapsulated_message.cpp | Minor bug fix for condition checking |
| include/sparrow_ipc/magic_values.hpp | Removed empty line for formatting consistency |
| CMakeLists.txt | Updated build configuration to include new serialization source files |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| { | ||
| return {}; | ||
| } | ||
| if (!utils::check_record_batches_consistency(record_batches)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want a streaming interface at some point, so that not all record batches have to be materialized before serializing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it's a first step, we will support streaming later in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but all the internals in the PR are architected around the idea of serializing all batches at once to a single vector. So you'll have to refactor all of this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "low level" methods work on single batches, so I guess we could add a method accepting a single batch that would directly call them. Or if we can optimize this implementation such that serializing a vector of a single record_batch is equivalent to serializing a record_batch directly, we can add something like:
std::vector<uint8_t> serialize(const record_batch& rb)
{
return serialize(std::ranges::single_view(rb));
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is addressed in the next PR
| std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]); | ||
| std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches); | ||
| serialized_schema.insert( | ||
| serialized_schema.end(), | ||
| std::make_move_iterator(serialized_record_batches.begin()), | ||
| std::make_move_iterator(serialized_record_batches.end()) | ||
| ); | ||
| // End of stream message | ||
| serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is going to make several memory copies because of resizes along the way:
- copies when appending each serialized record batch to the previous one (unless you're extremely careful to presize the output buffer to the right length)
- a copy when appending the serialized batches to the serialized schema
So not only you materialize the entire IPC stream in memory, but there are intermediate copies involved. This is probably not going to perform very nicely on very large data.
Given that IPC can typically be written into a file or socket, this seems suboptimal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's definitely sub optimal, the point is only to have a first milestone where we are able to follow the serialization specification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO: rewrite for performance, or something similar? We should also open an issue to track this once the PR is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A possible optimization could be to have the serialize_xxx implementation methods accept an std::vector<std::uint8_t>, or an insert_iterator, instead of returning a std::vector<std::uint8_t>. This way, the "driving" method could allocate a single vector, reserve additional memory between calls to the lower layer methods when possible (this can be done in a dedicated PR when optimizing the performance).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is addressed in a next PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is addressed in the next PR
| const size_t first_rb_nb_columns = first_rb.nb_columns(); | ||
| for (const sparrow::record_batch& rb : record_batches) | ||
| { | ||
| const auto rb_nb_columns = rb.nb_columns(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised you don't have a schema comparison function to automate all this instead of comparing columns one by one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have schema comparison, but our record_batch don't use ArrowSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as here
| * with continuation bytes, 4-byte length prefix, schema data, and 8-byte alignment padding | ||
| */ | ||
| [[nodiscard]] SPARROW_IPC_API std::vector<uint8_t> | ||
| serialize_schema_message(const sparrow::record_batch& record_batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious: you don't have a sparrow::schema abstraction that can be passed here instead of a record batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our record_batch don't use ArrowSchema, we have to transform it to a Struct array first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that your PR adding extraction to batches has been merged, we could release sparrow and rebase this PR on it? This can also be done in a future PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we should extract it to a struct array in that case.
| * in Arrow IPC format, ready for transmission or storage | ||
| */ | ||
| [[nodiscard]] SPARROW_IPC_API std::vector<uint8_t> | ||
| serialize_record_batch(const sparrow::record_batch& record_batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work for dictionary arrays? (or it doesn't?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't support it currently
| const auto value_offset = builder.CreateString(std::string(value)); | ||
| kv_offsets.push_back(org::apache::arrow::flatbuf::CreateKeyValue(builder, key_offset, value_offset)); | ||
| } | ||
| return builder.CreateVector(kv_offsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return builder.CreateVector(kv_offsets); | |
| return builder.CreateVector(std::move(kv_offsets)); |
| flatbuffers::Offset<org::apache::arrow::flatbuf::Field> field = create_field(builder, child); | ||
| children_vec.emplace_back(field); | ||
| } | ||
| return children_vec.empty() ? 0 : builder.CreateVector(children_vec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here and below: move children_vec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateVector takes const ref
| const auto fields_vec = create_children(schema_builder, record_batch.columns()); | ||
| const auto schema_offset = org::apache::arrow::flatbuf::CreateSchema( | ||
| schema_builder, | ||
| org::apache::arrow::flatbuf::Endianness::Little, // TODO: make configurable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than configurable, it should probably mirror the machine's endianness?
src/serialize_utils.cpp
Outdated
| utils::align_to_8(static_cast<int64_t>(schema_buffer.size())) | ||
| - static_cast<int64_t>(schema_buffer.size()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you want some kind of helper function for the padding calculation? You're computing it in fill_body as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| std::vector<uint8_t> body = generate_body(record_batch); | ||
| output.insert(output.end(), std::make_move_iterator(body.begin()), std::make_move_iterator(body.end())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is making another copy...
| } | ||
| } | ||
|
|
||
| // Helper function to create a simple ArrowSchema for testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: not indented correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah the indentation here doesn't seem to be changed.
|
Looking at the linux job (from source) failures, it seems we're missing |
| } | ||
| } | ||
|
|
||
| // Helper function to create a simple ArrowSchema for testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah the indentation here doesn't seem to be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in one of the comments, a possible way to improve performance and limit the allocations is to have the different methods accept a std::vector or an iterator instead of returning a std::vector. This would make it easy to have different high level APIs (one returning a single vector, one returning a vector of vectors if that seems more appropriate, one accepting a stream that would call the underlying method with a stream_iterator, etc etc). This refactoring can be done in a dedicated PR when we have a reference implementation to test again.
include/sparrow_ipc/magic_values.hpp
Outdated
| * https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format | ||
| */ | ||
| constexpr std::array<uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF}; | ||
| constexpr std::array<std::uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| constexpr std::array<std::uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF}; | |
| inline constexpr std::array<std::uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF}; |
include/sparrow_ipc/magic_values.hpp
Outdated
| * https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format | ||
| */ | ||
| constexpr std::array<uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00}; | ||
| constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00}; | |
| inline constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00}; |
| * with continuation bytes, 4-byte length prefix, schema data, and 8-byte alignment padding | ||
| */ | ||
| [[nodiscard]] SPARROW_IPC_API std::vector<uint8_t> | ||
| serialize_schema_message(const sparrow::record_batch& record_batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that your PR adding extraction to batches has been merged, we could release sparrow and rebase this PR on it? This can also be done in a future PR.
| std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]); | ||
| std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches); | ||
| serialized_schema.insert( | ||
| serialized_schema.end(), | ||
| std::make_move_iterator(serialized_record_batches.begin()), | ||
| std::make_move_iterator(serialized_record_batches.end()) | ||
| ); | ||
| // End of stream message | ||
| serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO: rewrite for performance, or something similar? We should also open an issue to track this once the PR is merged.
| * @note The function reserves memory for the vector based on the metadata size for | ||
| * optimal performance. | ||
| */ | ||
| [[nodiscard]] SPARROW_IPC_API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, this dependency should not leak. Given that this function and create_field are used in serialize_utils only, they should not be declared here. Besides, their visibility should be private/hidden.
If there is a need to test them, then their visibility can be conditionally changed to public (only when building the tests for instance), and they should be declared as extern in the test library.
| const size_t first_rb_nb_columns = first_rb.nb_columns(); | ||
| for (const sparrow::record_batch& rb : record_batches) | ||
| { | ||
| const auto rb_nb_columns = rb.nb_columns(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as here
| * @note The function uses move iterators to efficiently transfer the serialized data | ||
| * from individual record batches to the output vector. | ||
| */ | ||
| [[nodiscard]] std::vector<uint8_t> serialize_record_batches(const R& record_batches) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the name a bit confusing; at first sight, this method looks very similar to the public API serialize(cont R& record_batchs). Even if the documentation explains the differences in detail, the names sould reflect these difference so that a reader can quickly see these methods are different.
| std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]); | ||
| std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches); | ||
| serialized_schema.insert( | ||
| serialized_schema.end(), | ||
| std::make_move_iterator(serialized_record_batches.begin()), | ||
| std::make_move_iterator(serialized_record_batches.end()) | ||
| ); | ||
| // End of stream message | ||
| serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A possible optimization could be to have the serialize_xxx implementation methods accept an std::vector<std::uint8_t>, or an insert_iterator, instead of returning a std::vector<std::uint8_t>. This way, the "driving" method could allocate a single vector, reserve additional memory between calls to the lower layer methods when possible (this can be done in a dedicated PR when optimizing the performance).
| { | ||
| return {}; | ||
| } | ||
| if (!utils::check_record_batches_consistency(record_batches)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "low level" methods work on single batches, so I guess we could add a method accepting a single batch that would directly call them. Or if we can optimize this implementation such that serializing a vector of a single record_batch is equivalent to serializing a record_batch directly, we can add something like:
std::vector<uint8_t> serialize(const record_batch& rb)
{
return serialize(std::ranges::single_view(rb));
}
This will be done in the next PR |
Done in the next PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's open some issues to track what will be fixed in upcoming PRs when this is merged.
Add documentation wip add tests cleaning Address review Fix fix compialtion try fix formatting review
8fc0f0f to
444482d
Compare
|
Alright, issues have been opened to track unaddressed comments here, let's merge. |
No description provided.